package com.markhsiu.minimq.remote.transport.talentaio.handler;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.core.exeption.MiniMQException;
import com.markhsiu.minimq.message.Message;
import com.markhsiu.minimq.message.constant.MessageSourceEnum;
import com.markhsiu.minimq.remote.processor.MessageProcessor;
import com.markhsiu.minimq.remote.transport.talentaio.MessagePacket;
import com.markhsiu.minimq.remote.transport.talentaio.TalenAioChannelAdapterProcessor;
import com.markhsiu.minimq.serialize.SerializeCodec;
import com.markhsiu.minimq.serialize.protostuff.ProtostuffCodec;
import com.talent.aio.common.ChannelContext;
import com.talent.aio.server.intf.ServerAioHandler;

public class MessageServerAioHandler extends MessageAbsAioHandler
		implements ServerAioHandler<Object, MessagePacket, Object> {
	private static Logger logger = LoggerFactory.getLogger(MessageServerAioHandler.class);
	private static boolean isDebugEnabled = logger.isDebugEnabled();
	
	private static final AtomicInteger count = new AtomicInteger(1);
	private Map<MessageSourceEnum, MessageProcessor> handlers;
	
	@SuppressWarnings("unused")
	private  MessageServerAioHandler() {}
	
	public MessageServerAioHandler(Map<MessageSourceEnum, MessageProcessor> handlers){
		this.handlers = handlers;
	}
	
	/**
	 * 处理消息
	 */
	@Override
	public Object handler(MessagePacket packet, ChannelContext<Object, MessagePacket, Object> channel)
			throws Exception {
		
		byte[] body = packet.getBody();
		if (body != null) {
			handler(body, channel, packet);
		}
		return null;
	}
	
	 private void handler(byte[] body,ChannelContext<Object, MessagePacket, Object> channel, MessagePacket  packet){
	    	if(isDebugEnabled){
	    		logger.debug("count = ",count.getAndIncrement());
	    	}
		 	
			SerializeCodec<Message> codec = ProtostuffCodec.OneInstance();
			Message message = null;
			try {
				message = codec.deserialize(body);
			} catch (IOException e) {
				e.printStackTrace();
			}
			if(message == null || message.getMessageID() == null){
				  throw new MiniMQException("message or messageID is null");
			}
			if(isDebugEnabled){
				logger.debug("收到消息：{} " , message);
			}
			
	    	 
	    	MessageProcessor messageProcessor =  handlers.get(message.getSource());
	    	if(messageProcessor == null){
	    		messageProcessor = handlers.get(MessageProcessor.DEFAULT);
	    	}
	    	messageProcessor.handler(message, new TalenAioChannelAdapterProcessor(channel,codec));
	    }
}
